-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path04_put.py
141 lines (127 loc) · 3.88 KB
/
04_put.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import csv
import os
from datetime import datetime, timezone
from snowforge.file_format import CompressionType, FileFormatSpecification
from snowforge.forge import Forge, SnowflakeConfig
from snowforge.put import InternalStage, Put
from snowforge.stage import (
InternalDirectoryTableParams,
InternalStageEncryptionType,
InternalStageParams,
Stage,
)
# Create sample data directory if it doesn't exist
sample_data_dir = "sample_data"
os.makedirs(sample_data_dir, exist_ok=True)
# Generate sample users data
users_data = [
["email", "password_hash", "first_name", "last_name", "created_at", "updated_at"],
[
"e10adc3949ba59abbe56e057f20f883e",
"John",
"Doe",
datetime.now(timezone.utc).isoformat(),
datetime.now(timezone.utc).isoformat(),
],
[
"f30aa7a662c728b7407c54ae6bfd27d1",
"Jane",
"Smith",
datetime.now(timezone.utc).isoformat(),
datetime.now(timezone.utc).isoformat(),
],
]
# Generate sample products data
products_data = [
["sku", "name", "description", "price", "category", "created_at"],
[
"TECH-001",
"Laptop Pro X",
"High-performance laptop with 16GB RAM",
"999.99",
"Electronics",
datetime.now(timezone.utc).isoformat(),
],
[
"TECH-002",
"Wireless Mouse",
"Ergonomic wireless mouse",
"29.99",
"Electronics",
datetime.now(timezone.utc).isoformat(),
],
[
"HOME-001",
"Coffee Maker",
"12-cup programmable coffee maker",
"79.99",
"Home Appliances",
datetime.now(timezone.utc).isoformat(),
],
]
# Write sample data to CSV files
users_file = os.path.join(sample_data_dir, "users.csv")
products_file = os.path.join(sample_data_dir, "products.csv")
# with open(users_file, "w", newline="") as f:
# writer = csv.writer(f)
# writer.writerows(users_data)
# with open(products_file, "w", newline="") as f:
# writer = csv.writer(f)
# writer.writerows(products_data)
product_data_stage = (
Stage.builder("PRODUCT_DATA_STAGE")
.with_create_if_not_exists()
.with_stage_params(InternalStageParams(encryption=InternalStageEncryptionType.SSE))
.with_directory_table_params(
InternalDirectoryTableParams(enable=True, refresh_on_create=True)
)
.with_file_format(
FileFormatSpecification.named(
"CSV_FORMAT",
)
)
.with_comment("Product data stage")
.build()
)
user_data_stage = (
Stage.builder("USER_DATA_STAGE")
.with_create_if_not_exists()
.with_stage_params(InternalStageParams(encryption=InternalStageEncryptionType.SSE))
.with_directory_table_params(
InternalDirectoryTableParams(enable=True, refresh_on_create=True)
)
.with_file_format(
FileFormatSpecification.named(
"CSV_FORMAT",
)
)
.with_comment("User data stage")
.build()
)
# Modified example usage
with Forge(SnowflakeConfig.from_env()) as forge:
# First execute the database and schema operations
forge.workflow().use_database("OFFICIAL_TEST_DB").use_schema(
"OFFICIAL_TEST_SCHEMA"
).add_stages([product_data_stage, user_data_stage]).execute()
# Then execute the PUT operations separately
forge.put_file(
Put.builder()
.with_file_path(users_file)
.with_auto_compress(False)
.with_overwrite(True)
.with_source_compression(CompressionType.NONE)
.with_stage(InternalStage("named", "USER_DATA_STAGE"))
.build()
)
forge.put_file(
Put.builder()
.with_file_path(products_file)
.with_auto_compress(False)
.with_overwrite(True)
.with_source_compression(CompressionType.NONE)
.with_stage(InternalStage("named", "PRODUCT_DATA_STAGE"))
.build()
)